package cn.kinoko.component;

import cn.kinoko.config.ConfigProperties;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Getter;
import lombok.Setter;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RKeys;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.logging.Logger;

/**
 * 布隆过滤器加载器
 *
 * @author kk
 */
@Getter
@Setter
@Component
public class BloomFilterLoader implements InitializingBean, DisposableBean {

    private static final Logger LOGGER = Logger.getLogger(BloomFilterLoader.class.getName());

    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private ConfigProperties configProperties;
    public static RBloomFilter<Object> BLOOM_FILTER;
    public static Cache<String, Integer> REF_COUNT;

    @Override
    public void afterPropertiesSet() throws Exception {
        ConfigProperties.BloomFilterConfig bloomFilterConfig = configProperties.getBloomFilter();
        if (!bloomFilterConfig.isEnable()) {
            return;
        }
        RKeys keys = redissonClient.getKeys();
        long keyCount = keys.count();
        LOGGER.info(keyCount + " keys were scanned in current redisDb");
        if (keyCount == 0) {
            return;
        }
        // 初始化布隆过滤器
        String bloomFilterKey = bloomFilterConfig.getKey();
        double tolerateError = bloomFilterConfig.getTolerateError();
        BLOOM_FILTER = redissonClient.getBloomFilter(bloomFilterKey);
        BLOOM_FILTER.tryInit(keyCount, tolerateError);
        // 添加所有key
        keys.getKeys().forEach(BLOOM_FILTER::add);
        LOGGER.info(keyCount + " keys were fill in bloomFilter");
        LOGGER.info("bloomFilter provide for " + bloomFilterKey + " and {" + bloomFilterKey + "}:config" +
                " key, " + "tolerate error " + tolerateError);
        // 初始化软布隆过滤器
        REF_COUNT = Caffeine.newBuilder()
                .initialCapacity(16)
                .maximumSize(1024)
                .expireAfterAccess(Duration.ofMinutes(bloomFilterConfig.getReleaseRetry())) // 默认10分钟过期
                .build();
    }

    @Override
    public void destroy() {
        LOGGER.info("bloomFilter destroy....");
        ConfigProperties.BloomFilterConfig bloomFilterConfig = configProperties.getBloomFilter();
        if (!bloomFilterConfig.isEnable() || BLOOM_FILTER == null) {
            return;
        }
        BLOOM_FILTER.delete();
    }

}